/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.io.compress;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.utils.ByteBufferUtil;

import static org.junit.Assert.assertEquals;

public class CompressedSequentialWriterReopenTest extends CQLTester
{
    @Test
    public void badCompressor1() throws IOException
    {
        BadCompressor bad = new BadCompressor();
        byte [] test = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
        byte [] out = new byte[10];
        bad.uncompress(test, 0, 20, out, 0);
        for (int i = 0; i < 10; i++)
            assertEquals(out[i], (byte)i);
    }

    @Test
    public void badCompressor2() throws IOException
    {
        BadCompressor bad = new BadCompressor();
        ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
        ByteBuffer output = ByteBuffer.allocate(40);
        bad.compress(input, output);
        for (int i = 0; i < 40; i++)
            assertEquals(i % 20, output.get(i));
    }

    @Test
    public void badCompressor3() throws IOException
    {
        BadCompressor bad = new BadCompressor();
        ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
        ByteBuffer output = ByteBuffer.allocate(10);
        bad.uncompress(input, output);
        for (int i = 0; i < 10; i++)
            assertEquals(i, output.get(i));
    }

    @Test
    public void compressionEnabled() throws Throwable
    {
        createTable("create table %s (id int primary key, t blob) with compression = {'class':'org.apache.cassandra.io.compress.CompressedSequentialWriterReopenTest$BadCompressor'}");
        byte [] blob = new byte[1000];
        (new Random()).nextBytes(blob);
        getCurrentColumnFamilyStore().disableAutoCompaction();
        for (int i = 0; i < 10000; i++)
        {
            execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob));
        }
        flush();
        for (int i = 0; i < 10000; i++)
        {
            execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob));
        }
        flush();
        DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(1);
        getCurrentColumnFamilyStore().forceMajorCompaction();
    }

    public static class BadCompressor implements ICompressor
    {
        public static ICompressor create(Map<String, String> options)
        {
            return new BadCompressor();
        }

        @Override
        public int initialCompressedBufferLength(int chunkLength)
        {
            return chunkLength * 2;
        }

        @Override
        public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
        {
            System.arraycopy(input, inputOffset, output, outputOffset, inputLength / 2);
            return inputLength / 2;
        }

        @Override
        public void compress(ByteBuffer input, ByteBuffer output) throws IOException
        {
            int len = input.remaining();
            byte [] arr = ByteBufferUtil.getArray(input);
            output.put(arr);
            output.put(arr);
            input.position(len);
        }

        @Override
        public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
        {
            byte [] arr = ByteBufferUtil.getArray(input);
            output.put(arr, 0, arr.length / 2);
            input.position(arr.length);
        }

        @Override
        public BufferType preferredBufferType()
        {
            return BufferType.ON_HEAP;
        }

        @Override
        public boolean supports(BufferType bufferType)
        {
            return true;
        }

        @Override
        public Set<String> supportedOptions()
        {
            return null;
        }
    }
}
